007_001_lab_DQN 1 (NIPS 2013).html
# You will implement algorithm presented in 2013
# You can see DQN 2013
# You initialize network like following
# You initialize replay memory D
# You initialize "action-value function Q" with random weights
# You perform preprocessing
# You take state $$$s_{1}$$$,
# and you convert it into shape you want
# img 2018-04-29 13-12-02.png
#
# You use E-greedy to select "action"
# If random value is less than e, you select action randomly
# or else, you ask mainDQN network with passing state to select action
# You use buffer to store experience data
# (state, action, reward, next_state, done)
# You create mini-batches by using random.sample()
# Then you perform training network
# You should define target $$$y_{j}$$$
# You should be above processo with deviding two cases (done, else)
# img 2018-04-29 13-14-13.png
#
"""
DQN (NIPS 2013)
Playing Atari with Deep Reinforcement Learning
https://www.cs.toronto.edu/~vmnih/docs/dqn.pdf
"""
import numpy as np
import tensorflow as tf
import random
import dqn
import gym
from collections import deque
env = gym.make('CartPole-v0')
env = gym.wrappers.Monitor(env, 'gym-results/', force=True)
# This is size of input data (state) 4
INPUT_SIZE = env.observation_space.shape[0]
# This is size of output data (action) 2
OUTPUT_SIZE = env.action_space.n
DISCOUNT_RATE = 0.99
REPLAY_MEMORY = 50000
MAX_EPISODE = 5000
BATCH_SIZE = 64
# minimum epsilon for epsilon greedy
MIN_E = 0.0
# epsilon will be `MIN_E` at `EPSILON_DECAYING_EPISODE`
EPSILON_DECAYING_EPISODE = MAX_EPISODE * 0.01
def bot_play(mainDQN: dqn.DQN) -> None:
"""Runs a single episode with rendering and prints a reward
Args:
mainDQN (dqn.DQN): DQN Agent
"""
state = env.reset()
total_reward = 0
while True:
env.render()
action = np.argmax(mainDQN.predict(state))
state, reward, done, _ = env.step(action)
total_reward += reward
if done:
print("Total score: {}".format(total_reward))
break
def train_minibatch(DQN: dqn.DQN, train_batch: list) -> float:
"""Prepare X_batch, y_batch and train them
Recall our loss function is
target = reward + discount * max Q(s',a)
or reward if done early
Loss function: [target - Q(s, a)]^2
Hence,
X_batch is a state list
y_batch is reward + discount * max Q
or reward if terminated early
Args:
DQN (dqn.DQN): DQN Agent to train & run
train_batch (list): Minibatch of Replay memory
Eeach element is a tuple of (s, a, r, s', done)
Returns:
loss: Returns a loss
"""
state_array = np.vstack([x[0] for x in train_batch])
action_array = np.array([x[1] for x in train_batch])
reward_array = np.array([x[2] for x in train_batch])
next_state_array = np.vstack([x[3] for x in train_batch])
done_array = np.array([x[4] for x in train_batch])
X_batch = state_array
y_batch = DQN.predict(state_array)
Q_target = reward_array + DISCOUNT_RATE * np.max(DQN.predict(next_state_array), axis=1) * ~done_array
y_batch[np.arange(len(X_batch)), action_array] = Q_target
# Train our network using target and predicted Q values on each episode
loss, _ = DQN.update(X_batch, y_batch)
return loss
def annealing_epsilon(episode: int, min_e: float, max_e: float, target_episode: int) -> float:
"""Return an linearly annealed epsilon
Epsilon will decrease over time until it reaches `target_episode`
(epsilon)
|
max_e ---|\
| \
| \
| \
min_e ---|____\_______________(episode)
|
target_episode
slope = (min_e - max_e) / (target_episode)
intercept = max_e
e = slope * episode + intercept
Args:
episode (int): Current episode
min_e (float): Minimum epsilon
max_e (float): Maximum epsilon
target_episode (int): epsilon becomes the `min_e` at `target_episode`
Returns:
float: epsilon between `min_e` and `max_e`
"""
slope = (min_e - max_e) / (target_episode)
intercept = max_e
return max(min_e, slope * episode + intercept)
def main():
# store the previous observations in replay memory
replay_buffer = deque(maxlen=REPLAY_MEMORY)
last_100_game_reward = deque(maxlen=100)
with tf.Session() as sess:
mainDQN = dqn.DQN(sess, INPUT_SIZE, OUTPUT_SIZE)
init = tf.global_variables_initializer()
sess.run(init)
for episode in range(MAX_EPISODE):
e = annealing_epsilon(episode, MIN_E, 1.0, EPSILON_DECAYING_EPISODE)
done = False
state = env.reset()
step_count = 0
while not done:
if np.random.rand() < e:
action = env.action_space.sample()
else:
action = np.argmax(mainDQN.predict(state))
next_state, reward, done, _ = env.step(action)
if done:
reward = -1
replay_buffer.append((state, action, reward, next_state, done))
state = next_state
step_count += 1
if len(replay_buffer) > BATCH_SIZE:
minibatch = random.sample(replay_buffer, BATCH_SIZE)
train_minibatch(mainDQN, minibatch)
print("[Episode {:>5}] steps: {:>5} e: {:>5.2f}".format(episode, step_count, e))
# CartPole-v0 Game Clear Logic
last_100_game_reward.append(step_count)
if len(last_100_game_reward) == last_100_game_reward.maxlen:
avg_reward = np.mean(last_100_game_reward)
if avg_reward > 199.0:
print("Game Cleared within {} episodes with avg reward {}".format(episode, avg_reward))
break
if __name__ == "__main__":
main()
# @
# Implement replay memory
# You will use simply deque() to input values and extract values
# img 2018-04-29 13-43-06.png
#
# You can keep fixed size by using popleft()
# @
# You train model with values from replay memory
# img 2018-04-29 14-04-31.png
#
# @
# Summary
# 1. You build network, and initialize it
# 1. You build environment
# 1. You perform loop, in that step,
# you get "action" by several ways
# you use obtained "action", and obtain values (reward, new state, done or not done)
# 1. You store above values into buffer
# and you keep performing loop
# 1. At some point (like one time per 10 loops),
# you extract values randomly from buffer,
# 1. You train model with randomly extracted values
# 1. You keep performing loop